iT邦幫忙

2023 iThome 鐵人賽

DAY 23
0

今天我們來看看 webSocket("/echo") 裡面所定義的行為

send("Please enter your name")
for (frame in incoming) {
	frame as? Frame.Text ?: continue
	val receivedText = frame.readText()
	if (receivedText.equals("bye", ignoreCase = true)) {
		close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
	} else {
		send(Frame.Text("Hi, $receivedText!"))
	}
}

每段分別是什麼意思。

我們先來看看 send

/**
 * Enqueues a text frame for sending with the specified [content].
 *
 * May suspend if the outgoing queue is full, and throw an exception if the channel is already closed.
 */
public suspend fun WebSocketSession.send(content: String): Unit = send(Frame.Text(content))

這邊的 Frame

/**
 * A frame received or ready to be sent. It is not reusable and not thread-safe
 * @property fin is it final fragment, should be always `true` for control frames and if no fragmentation is used
 * @property frameType enum value
 * @property data - a frame content or fragment content
 * @property disposableHandle could be invoked when the frame is processed
 */
public actual sealed class Frame actual constructor(
    public actual val fin: Boolean,
    public actual val frameType: FrameType,
    public actual val data: ByteArray,
    public actual val disposableHandle: DisposableHandle,
    public actual val rsv1: Boolean,
    public actual val rsv2: Boolean,
    public actual val rsv3: Boolean
) 

Frame.Text(content) 則是

public actual constructor(text: String) : this(true, text.toByteArray())

WebSocketSession.send(frame: Frame) 的實作則是

/**
 * Enqueue a frame, may suspend if an outgoing queue is full. May throw an exception if the
 * outgoing channel is already closed, so it is impossible to transfer any message.
 * Frames that were sent after close frame could be silently ignored.
 * Note that a close frame could be sent automatically in reply to a peer's close frame unless it is
 * raw WebSocket session.
 */
public suspend fun send(frame: Frame) {
	outgoing.send(frame)
}

這邊的 outgoing 則是

/**
 * An outgoing frames channel. It could have limited capacity so sending too many frames may lead to suspension at
 * corresponding send invocations. It also may suspend if a peer doesn't read frames for some reason.
 */
public val outgoing: SendChannel<Frame>

看完這段,我們就知道了,send("Please enter your name") 這段會將文字包成一個 Frame 物件,送到 SendChannel 裡面。

至於什麼是 Frame,定義的細節可以看 The websocket connection lifespan and frame structure.
或者 RFC 6455 5.2 Base Framing Protocol 的說明,今天我們先不挖到這麼底層的實作內容。

我們往下看 for (frame in incoming) 的內容,incoming

/**
 * An incoming frames channel.
 * Note that if you use `webSocket` to handle a WebSockets session,
 * the incoming channel doesn't contain control frames such as the ping/pong or close frames.
 * If you need control over control frames, use the `webSocketRaw` function.
 */
public val incoming: ReceiveChannel<Frame>

既然要能夠將 incoming 放在 for 裡面使用,ReceiveChannel 一定有實作 iterator

/**
 * Returns a new iterator to receive elements from this channel using a `for` loop.
 * Iteration completes normally when the channel [is closed for `receive`][isClosedForReceive] without a cause and
 * throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
 */
public operator fun iterator(): ChannelIterator<E>
/**
 * Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used
 * from concurrent coroutines.
 */
public interface ChannelIterator<out E> 

這邊我們看過了怎麼讓 for (frame in incoming) 持續的取出 Frame 物件

接著 frame as? Frame.Text ?: continue 試著將 Frame 轉成 Frame.Text

/**
 * Represents an application level text frame.
 * In a RAW web socket session a big text frame could be fragmented
 * (separated into several text frames so they have [fin] = false except the last one).
 * Please note that a boundary between fragments could be in the middle of multi-byte (unicode) character
 * so don't apply String constructor to every fragment but use decoder loop instead of concatenate fragments first.
 * Note that usually there is no need to handle fragments unless you have a RAW web socket session.
 */
public actual class Text actual constructor(
	fin: Boolean,
	data: ByteArray,
	rsv1: Boolean,
	rsv2: Boolean,
	rsv3: Boolean
) : Frame(fin, FrameType.TEXT, data, NonDisposableHandle, rsv1, rsv2, rsv3) {

	public actual constructor(fin: Boolean, data: ByteArray) : this(fin, data, false, false, false)

	public actual constructor(text: String) : this(true, text.toByteArray())

	public actual constructor(fin: Boolean, packet: ByteReadPacket) : this(fin, packet.readBytes())

	public constructor(fin: Boolean, buffer: ByteBuffer) : this(fin, buffer.moveToByteArray())
}

如果不能轉換成 Frame.Text 的話,continue 取得下一個 Frame

可以的話,就往下進行 val receivedText = frame.readText()

Frame.Text.readText() 則是

/**
 * Reads text content from the text frame.
 * Shouldn't be used for fragmented frames: such frames need to be reassembled first.
 */
public fun Frame.Text.readText(): String {
    require(fin) { "Text could be only extracted from non-fragmented frame" }
    return Charsets.UTF_8.newDecoder().decode(buildPacket { writeFully(data) })
}

這邊使用了 CharsetDecoderdataByteArray 轉換成 String

這邊使用了一個 Kotlin 開發時的小技巧 require

/**
 * Throws an [IllegalArgumentException] with the result of calling [lazyMessage] if the [value] is false.
 *
 * @sample samples.misc.Preconditions.failRequireWithLazyMessage
 */
@kotlin.internal.InlineOnly
public inline fun require(value: Boolean, lazyMessage: () -> Any): Unit {
    contract {
        returns() implies value
    }
    if (!value) {
        val message = lazyMessage()
        throw IllegalArgumentException(message.toString())
    }
}

這可以在進入函數之前先檢查,如果不符合條件拋出 IllegalArgumentException

收到了轉成 StringreceivedText,接著是

if (receivedText.equals("bye", ignoreCase = true)) {
	close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
} 

這邊的 CloseReason.Codes.NORMAL

public enum class Codes(public val code: Short) {
	NORMAL(1000),
}

我們可以順便看看其他的 CloseReason.Codes

/**
 * Standard close reason codes
 *
 * see https://tools.ietf.org/html/rfc6455#section-7.4 for list of codes
 */
@Suppress("KDocMissingDocumentation")
public enum class Codes(public val code: Short) {
	NORMAL(1000),
	GOING_AWAY(1001),
	PROTOCOL_ERROR(1002),
	CANNOT_ACCEPT(1003),

	@InternalAPI
	@Deprecated("This code MUST NOT be set as a status code in a Close control frame by an endpoint")
	CLOSED_ABNORMALLY(1006),
	NOT_CONSISTENT(1007),
	VIOLATED_POLICY(1008),
	TOO_BIG(1009),
	NO_EXTENSION(1010),
	INTERNAL_ERROR(1011),
	SERVICE_RESTART(1012),
	TRY_AGAIN_LATER(1013);

	public companion object {
		private val byCodeMap = values().associateBy { it.code }

		@Deprecated(
			"Use INTERNAL_ERROR instead.",
			ReplaceWith(
				"INTERNAL_ERROR",
				"io.ktor.websocket.CloseReason.Codes.INTERNAL_ERROR"
			)
		)
		@JvmField
		@Suppress("UNUSED")
		public val UNEXPECTED_CONDITION: Codes = INTERNAL_ERROR

		/**
		 * Get enum value by close reason code
		 * @return enum instance or null if [code] is not in standard
		 */
		public fun byCode(code: Short): Codes? = byCodeMap[code]
	}
}

一樣是依照 RFC 6455 定義出來的。

然後就可以建立出 CloseReason 物件

/**
 * A WebSocket close reason.
 * @property code - close reason code as per RFC 6455, recommended to be one of [CloseReason.Codes]
 * @property message - a close reason message, could be empty
 */
public data class CloseReason(val code: Short, val message: String) {
    public constructor(code: Codes, message: String) : this(code.code, message)

並且透過 close 送出關閉連線訊息

/**
 * Sends a close frame with the specified [reason]. May suspend if the outgoing channel is full.
 * The specified [reason] could be ignored if there was already
 * close frame sent (for example in reply to a peer close frame). It also may do nothing when a session or an outgoing
 * channel is already closed due to any reason.
 */
public suspend fun WebSocketSession.close(reason: CloseReason = CloseReason(CloseReason.Codes.NORMAL, "")) {
    try {
        send(Frame.Close(reason))
        flush()
    } catch (_: Throwable) {
    }
}

這邊的 WebSocketSession.flush 定義是

/**
 * Flushes all outstanding messages and suspends until all earlier sent messages will be written.
 * Could be called at any time even after close. May return immediately if the connection is already terminated.
 * However, it may also fail with an exception (or cancellation) at any point due to a session failure.
 * Note that [flush] doesn't guarantee that frames were actually delivered.
 */
public suspend fun flush()

實作的部分,我們可以參考 RawWebSocketJvm.flush

override suspend fun flush(): Unit = writer.flush()

如果沒有 close 的話,就會持續的運作 send(Frame.Text("Hi, $receivedText!"))

到這邊,我們簡單的看過了

send("Please enter your name")
for (frame in incoming) {
	frame as? Frame.Text ?: continue
	val receivedText = frame.readText()
	if (receivedText.equals("bye", ignoreCase = true)) {
		close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
	} else {
		send(Frame.Text("Hi, $receivedText!"))
	}
}

這一整段背後的實作。

明天我們來看看 Frame 是怎麼被架構出來的


上一篇
Day 22:call.respondWebSocketRaw 的實作以及 RawWebSocket
下一篇
Day 24:針對 WebSockets Frame 的實作細節
系列文
深入解析 Kotlin 專案 Ktor 的程式碼,探索 Ktor 的強大功能30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言